package org.yaac.server.egql.processor;

import static com.google.common.collect.Collections2.transform;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Sets.newHashSet;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

import org.yaac.server.egql.GroupByClause;
import org.yaac.server.egql.SelectStatement;
import org.yaac.server.egql.evaluator.AggregationEvaluator;
import org.yaac.server.egql.evaluator.EvaluationResult;
import org.yaac.server.egql.evaluator.Evaluator;
import org.yaac.server.egql.evaluator.aggregator.Aggregator;
import org.yaac.server.egql.processor.ProcessData.ProcessDataRecord;
import org.yaac.shared.egql.ResultStatus;
import org.yaac.shared.file.FileDownloadPath;

import com.google.common.base.Function;
import com.google.common.collect.HashBasedTable;

import com.google.common.collect.Table;

/**
 * @author Max Zhu (thebbsky@gmail.com)
 *
 */
public class SelectProccesor implements Processor {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@SuppressWarnings("unused")
	private static Logger logger = Logger.getLogger(SelectProccesor.class.getName());
	
	private SelectStatement stmt;
	
	/**
	 * groupByRecords, it's a super set of row record in groupByTable, 
	 * 
	 * because it is NOT mandatory to have aggregators when using group by functions
	 * eg, when exeucuting query select state from job group by state, this set is not empty while group by table will be empty
	 */
	private Set<Map<String, Object>> groupByRecords;
	
	/**
	 * mapping from group by table 
	 */
	private Table<Map<String, Object>, AggregationEvaluator, Aggregator> groupByTable;
	
	/**
	 * 
	 */
	@SuppressWarnings("unused")
	private SelectProccesor(){}
	
	/**
	 * @param stmt
	 */
	public SelectProccesor(SelectStatement stmt) {
		super();
		this.stmt = stmt;
		this.groupByRecords = newHashSet();
		this.groupByTable = HashBasedTable.create();
	}

	@Override
	public ProcessData process(ProcessContext context, ProcessData input) {
		ProcessData output = new ProcessData();
		
		// process select query
		List<Evaluator> selectItems = stmt.getSelectClause().getItems();	
		
		boolean isGroupByQuery = stmt.isGroupByQuery();
		boolean isAggregationOnlyQuery = stmt.aggregationEvaluatorOnly();
		Collection<AggregationEvaluator> aggregationEvaluators = stmt.getAllAggregationEvaluators();
		
		// for aggregator only query(those without group by) 
		// if no record matches where clause, then query should still returns columns with 0 rather than empty resultset
		if (isAggregationOnlyQuery) {
			Map<String, Object> emptyGroupByRecord = newLinkedHashMap();

			// insert empty group records into group by records
			groupByRecords.add(emptyGroupByRecord);
		}
		
		for (ProcessDataRecord record : input.getRecords()) {
			if (stmt.rejectedByWhereClause(record)) {
				continue;
			}
			
			// aggregate all counters here
			if (isGroupByQuery || isAggregationOnlyQuery) {
				// step 1 : ensure current group by record
				// it is possible that a group by query doesn't have any aggregation function
				// in that case, we still need to create an empty row in group by table 
				// since final results are generated from group by table
				groupByRecords.add(groupByRecord(record));
				
				// step 2 : evaluate all aggregation functions
				for (AggregationEvaluator e : aggregationEvaluators) {
					Aggregator agg = lookupAggregator(record, e);
					
					if (agg == null) {						
						// new aggregator
						agg = e.getType().newAggregator();
						// push back to context
						putAggregator(record, e, agg);
					} 
					
					e.aggregate(record, agg);
				}	
			} else {	// write result to output
				final Map<String, EvaluationResult> resultRow = newLinkedHashMap();
				
				for (Evaluator e : selectItems) {
					resultRow.put(e.getText(), e.evaluate(record));
				}
				
				output.addRecord(new ProcessDataRecord() {
					Map<String, EvaluationResult> expandedRow = expandSelectAllAndResolveDuplicateName(resultRow);
					
					@Override
					public EvaluationResult lookup(String name) {
						return expandedRow.get(name);
					}
					
					@Override
					public Iterable<EvaluationResult> asIterable() {
						return expandedRow.values();
					}

					@Override
					public FileDownloadPath lookupFileReference(Integer index) {
						throw new IllegalArgumentException();
					}
				});
			}
		}
		
		// last batch finished, append group by clause
		if (context.getStatus() == ResultStatus.FINISHED) {
			// if a statement is finished, then append all group by results
			for (final Map<String, Object> rowKey : this.groupByRecords) {
				
				ProcessDataRecord record = new ProcessDataRecord() {					
					@Override
					public EvaluationResult lookup(String name) {
						// must resolve aggregators first, because count(1) is a valid property name in appengine
						Map<AggregationEvaluator, Aggregator> aggMap = groupByTable.row(rowKey);
						for (AggregationEvaluator e : aggMap.keySet()) {
							if (e.getText().equals(name)) {
								Aggregator agg = aggMap.get(e);
								
								if (agg == null) {
									// for aggregation ONLY query
									//	when there is no record, the empty group by function is still trying to resolve aggregator
									return new EvaluationResult(null).withTitle(e.getText());	
								} else {
									return new EvaluationResult(agg.getResult()).withTitle(e.getText());
								}
							}
						}
						
						// fallback with other evaluators
						return new EvaluationResult(rowKey.get(name)).withTitle(name);
					}
					
					@Override
					public Iterable<EvaluationResult> asIterable() {
						return transform(rowKey.keySet(), new Function<String, EvaluationResult>(){
							@Override
							public EvaluationResult apply(String name) {
								return new EvaluationResult(rowKey.get(name)).withTitle(name);
							}
						});
					}

					@Override
					public FileDownloadPath lookupFileReference(Integer index) {
						throw new IllegalArgumentException();
					}
				};
				
				// process having clause
				if (stmt.rejectedByHavingClause(record)) {
					continue;
				}
				
				final Map<String, EvaluationResult> resultRow = newLinkedHashMap();
				
				for (Evaluator e : selectItems) {
					resultRow.put(e.getText(), e.evaluate(record));
				}
				
				output.addRecord(new ProcessDataRecord() {
					Map<String, EvaluationResult> expandedRow = expandSelectAllAndResolveDuplicateName(resultRow);
					
					@Override
					public EvaluationResult lookup(String name) {
						return expandedRow.get(name);
					}
					
					@Override
					public Iterable<EvaluationResult> asIterable() {
						return expandedRow.values();
					}

					@Override
					public FileDownloadPath lookupFileReference(Integer index) {
						throw new IllegalArgumentException();
					}
				});
			}
		}
		
		return output;
	}
	
	private Aggregator lookupAggregator(ProcessDataRecord record, AggregationEvaluator e) {
		Map<String, Object> groupByRecord = groupByRecord(record);
		return groupByTable.get(groupByRecord, e);
	}

	private void putAggregator(ProcessDataRecord record, AggregationEvaluator aggregationEvaluator, Aggregator agg) {
		Map<String, Object> groupByRecord = groupByRecord(record);
		groupByTable.put(groupByRecord, aggregationEvaluator, agg);
	}
	
	/**
	 * generate group by record based on the value and group by key
	 * 
	 * @param record
	 * @return
	 */
	private Map<String, Object> groupByRecord(ProcessDataRecord record) {
		GroupByClause clause = stmt.getGroupByClause();
		
		if (clause == null) {
			return newLinkedHashMap();
		} else {
			Iterable<String> groupByKeys = clause.itemsInStr();
			 
			Map<String, Object> groupByRecord = newLinkedHashMap();
			
			for (String propertyName : groupByKeys) {
				groupByRecord.put(propertyName, record.lookup(propertyName).getPayload());
			}
			
			return groupByRecord;
		}
	}
	
	/**
	 * this is normally invoked at end of the select process
	 * 
	 * why do we need to expand? because the next phase of nexted selection query / update / insert statement depends on the expanded one
	 * why don't we expand it in DatastoreLoader? in current select phase * is evaluated as a single value, so we need DatastoreLoader to provide a suppressed one
	 * 
	 * @param input
	 * @return
	 */
	private Map<String, EvaluationResult> expandSelectAllAndResolveDuplicateName(Map<String, EvaluationResult> input) {
		// expand select all
		Map<String, EvaluationResult> result = newLinkedHashMap();
		
		// resolve duplicate
		Function<String, String> duplicateResolver = new Function<String, String>(){
			Map<String, Integer> duplicateNameCounter = newHashMap();
			
			@Override
			public String apply(String propertyName) {
				Integer counter = duplicateNameCounter.get(propertyName);
				if (counter == null) {
					duplicateNameCounter.put(propertyName, 0);
					return propertyName;
				} else {
					duplicateNameCounter.put(propertyName, ++ counter);
					return propertyName + "_" + counter;
				}
			}
		};
		
		for (String propertyName : input.keySet()) {
			EvaluationResult r = input.get(propertyName);
			if (r.getPayload() instanceof EvaluationResult []) {	// expand *
				for (EvaluationResult elem : (EvaluationResult [])r.getPayload()) {
					result.put(duplicateResolver.apply(elem.propertyTitle()), elem);	
				}
			} else { // simply copy to new result
				result.put(duplicateResolver.apply(propertyName), r);
			}
		}
		
		return result;
	}
}
